StepFunctionsのSendMessageBatchでSQSにメッセージ送信する
はじめに
StepFunctionsを使ってSQSに大量のメッセージを送信したかったのでAWS SDK integrationsのSendMessageBatchを試してみました。
ワークフローの全体像
全体像は以下の図の通りです。Lambda関数で作ったデータをSendMessageBatchでSQSに送信します。SendMessageBatchの仕様を踏まえたポイントは以下の通りです。
- SendMessageBatchでは各メッセージのIDを送信側で指定する必要があります
- SendMessageBatchで一度に送信できるメッセージは10件までです
- Mapで並列処理したいのでメッセージを10件ごとのネストしたリストにします
Lambda関数
上記を踏まえてダミーのデータを生成する以下のような関数を作りました。
※注: このコードだと実際に大量のデータを生成すると実行時間がかなり長くなります
import json import uuid def generate_messages(n:int): return [{'Id': str(uuid.uuid4()), 'MessageBody': str(uuid.uuid4()) } for _ in range(n)] def lambda_handler(event, context): return [generate_messages(10) for _ in range(5)]
この関数の実行結果は以下のようになります。リストの要素のキー名はSendMessageBatch
と一致するようにしています。
[ [ { "Id": "00ca4544-98ae-4f41-9437-c92e9f8fd8ee", "MessageBody": "fee67a60-1f64-4dab-a995-8cd8b221392b" }, { "Id": "6b249be1-a14f-43d5-b1f0-7770f221d427", "MessageBody": "5674575a-8c0f-45f3-b5c4-c19770022bf9" }, { "Id": "a72abc8d-130f-4053-a602-a1d2d43e3b41", "MessageBody": "401d8afb-17c4-4489-a37a-d65f07ddae5f" } ], [ { "Id": "c5cdfb14-fd78-42b2-a98f-3184bda7eefa", "MessageBody": "b273ebbf-5ba1-48ae-a03c-75494cb32542" }, { "Id": "73860e6a-31e4-4bbb-8a1c-57af5bb8244f", "MessageBody": "4415227d-717c-44eb-82c4-9a8e529acea1" }, { "Id": "3405839d-51af-403e-ac29-fcbb38bb34cd", "MessageBody": "a835abaf-df76-41eb-9f5c-c232f68855b9" } ] ]
メッセージ送信 State
メッセージ送信を行うStateは以下のようになります。今回指定したパラメータは以下の通りです。
- QueueUrl キューのURL
- Entries メッセージのリスト(Lambdaの出力のリストの1要素を指定)
{ "Type": "Map", "Iterator": { "StartAt": "SendMessageBatch", "States": { "SendMessageBatch": { "Type": "Task", "End": true, "y": { "Entries.$": "$", "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxx/MyQueue" }, "Resource": "arn:aws:states:::aws-sdk:sqs:sendMessageBatch" } } }, "End": true }
SendMessageBatchステートの実行結果は以下のようになります。
{ "name": "SendMessageBatch", "output": { "Successful": [ { "Id": "38633e7c-d26b-4c81-bdf3-b322f803b2f6", "Md5OfMessageBody": "c3683ad70e1323ca7d94f43bb9b93e48", "MessageId": "46c6e844-a260-4b6e-94f5-50a5c8f1a127" }, { "Id": "62c77d52-1224-4071-a08f-d610e348e809", "Md5OfMessageBody": "28a1384230bea8b9f2cddfe281a900d9", "MessageId": "5961faa8-50a9-41bc-9581-4e63eee0f092" }, { "Id": "56f40da7-977e-455f-8727-bf6a36849e2e", "Md5OfMessageBody": "5811b32e4a99ebf010e2e3126702b7ff", "MessageId": "b4fd172b-7399-4bd7-9f0b-f0c3020e02f6" }, { "Id": "59d07a20-ceee-4d0c-abb0-0bb70c2e78c0", "Md5OfMessageBody": "a252facc027f419d952fe9380c7ce0f8", "MessageId": "43a8fbd0-ea76-465d-8d02-3b022f7452ad" }, { "Id": "ab7cd096-9d24-4006-8d8b-e8436298fa01", "Md5OfMessageBody": "9cc21716708dfe67ce1f9d9ac7f49f7c", "MessageId": "b54f1739-e7b3-4956-a291-27302d2000a0" }, { "Id": "4bf9d1bf-4a5d-4090-9439-23dd79314017", "Md5OfMessageBody": "ad87092f60055e3e7eb56428dc12f3b0", "MessageId": "0f410434-d455-4ff3-b4d4-8d5040db6ee1" }, { "Id": "937d4b83-9cda-44bc-bbb9-83546780ccad", "Md5OfMessageBody": "4ec8b0276c957e7eb77f168f8bf14c7e", "MessageId": "df546f4d-7d7a-4df7-b587-d10e5321271b" }, { "Id": "cef988b1-b5a4-4f17-bd2d-00855f9f871c", "Md5OfMessageBody": "6417372c284ee74d55bef32a5aef1521", "MessageId": "78b6d202-8edc-41f7-845c-a8b6c2ffddac" }, { "Id": "e92ee03e-3f66-41af-96d3-c0d109a11a97", "Md5OfMessageBody": "1a919be2c5062a74d06ff8a54cdebf81", "MessageId": "5a6b008d-6f8b-4556-b4bf-ceffb1083d30" }, { "Id": "f3a32103-0056-4752-8197-51452431171a", "Md5OfMessageBody": "87208e026194f7200291f946b0833096", "MessageId": "c9b88071-299d-4468-a4d3-d9513ed516fb" } ] }, "outputDetails": { "truncated": false } }
まとめ
Lambda関数で生成したデータをSendMessageBatch のパラメータをどう指定したらいいのか少し悩みましたが、データをSendMessageBatchの仕様に合わせることで解決しました。Optimized integrationであるSendMessageとの性能の差やそれ以外の比較点が気になるので機会があれば比較してみたいと思います。